Expand description

Library for publishing events for multiple consumers using asynchromous streams

Library provides EventStreams<T: ’static + Send + Sync> object which translates events of type T to arbitrary number of EventStream objects, which implements standard [futures::Stream] interface

Usage sample

use futures::{executor::LocalPool, task::LocalSpawnExt, StreamExt};
use async_event_streams::EventStreams;

let mut pool = LocalPool::new();

let streams = EventStreams::new();
let mut stream = streams.create_event_stream();

let sender_task = async move {
    assert!(streams.count() == 1);
    streams.send_event(42, None).await;
    streams.send_event(451, None).await;
    streams.send_event(1984, None).await;
};

let receiver_task = async move {
    let mut values = Vec::new();
    while let Some(event) = stream.next().await {
        values.push(*event);
    }
    // next() returns none when 'streams' is dropped
    assert!(values == vec![42, 451, 1984]);
};

pool.spawner().spawn_local(sender_task);
pool.spawner().spawn_local(receiver_task);
pool.run();

Event processing order

When event is put to EventStreams it becomes immediately available for all EventStream objects, created by this EventStreams. Events comes from each stream exactly in order as they being sent.

Since reveivers work in asynchronous environment it’s possible that streams are emptied unevenly. I.e. if events 1,2,3,4,5 put to EventStreams, one EventStream subscriber could process all 5 events while another is still waiting for first.

Sometimes it’s undesirable. So the mechanism to guarantee that all events ‘1’ are handled before sending event ‘2’ is implemented.

To achieve this the send_event function returns future SentEvent. Each EventStream instance receives clone of Event<T> object which all wraps the same instance of event. Subscribers get these Event instances and may hold them as long as they need it. SentEvent future is released only when all instances of this Event are dropped. This guarantees that next event is sent only when previous one has been processed by all subscribers.

If such blocking is not necessary, the post_event can be used instead.

Dependent Events

Received events may cause firing new events. For example mouse button click handler is sending mouse click events. These clicks causes GUI buttons to send button press events. It may be important to guarantee that button press events are not handled in order different than mouse clicks order.

For example consider two buttons A and B. Click C1 causes button A send press P1, click C2 causes button B send press P2. It’s guaranteed that P2 is sent after P1 (because P1 is reaction to C1, P2 is reaction to C2, and both C1 and C2 comes from same send_event). But there is still no guarantee that P2 is processed after P1, because P1 and P2 are sent by different send_events so the blocking mechanism decribed above doesn’t help.

This may cause problems. For example: user clicks “Apply” button and then “Close” button in the dialog. But press event from “Close” button comes earlier than from “Apply”. “Close” handler destroys the dialog, “Apply” is not processed, user’s data is lost.

To avoid this the send_event and post_event have the additional optional parameter source - event which was the cause of the sent one. Reference to this ‘source’ event is saved inside Event wrapper of new event and therefore source send_event is blocked until all derived events are dropped. So sending second click event in example above is delayed until “Apply” handler (which holds first click event) finishes.

Event sources, sinks and pipes

There are typical repeating operations with event streams. Object may generate events of different types (EventSource) and react to events (EventSink). Connecting event source to event sink can be performed by spawing asynchronous task with spawn_event_pipe

Structs

Reference-counting container with event. Each crate::EventStream instance receives clone of Event<T> referencing the same instance of T. When all instances of Event<T> are dropped, the crate::SentEvent future returned by send_event is released.

Clonable container with instance of event which hides type of event. The Event object is just wrapper over Arc<EventBox> with type of event specified

Asychronous stream of events of specified type. The stream’s next() method returns Some(Event<EVT>) while source object crate::EventStreams is alive and None when it is destroyed.

Main object which allows to send events and subscribe to them

Future returned by send_event. await on it blocks until all instances of crate::Event sent to EventStream by this send_event are dropped

Traits

Standartized interface for object reacting to events of specific type. The trait have two methods: on_event_owned which accepts event object and on_event_ref, accepting borrowed reference to event. It’s supposed that both bethods should work identically. But sometimes if it is necessary to retranslate the event received. So it is effective to handle owned event case separately from borrowed.

If the event object implements ToOwned trait (note that all Clone object implements it), EventSink implementation can be simplified by implementing helper EventSinkExt with only one event handler accepting std::borrow::Cow parameter, instead of separate handlers for owned and borrowed cases

Standartized interface for structures providing sream of events of specified type.

Functions

Connect EventSource to EventSink: run asynchronous task which reads events from source and calls EventSink::on_event_ref on sink object. Source may provide events for multiple readers, so only references to events are available from it.

Same as spawn_event_pipe, but also returns handle to task spawned by [futures::task::SpawnExt::spawn_with_handle]